
package io.github.hpsocket.soa.framework.leaf.segment;

import io.github.hpsocket.soa.framework.leaf.common.Result;
import io.github.hpsocket.soa.framework.leaf.common.Status;
import io.github.hpsocket.soa.framework.leaf.segment.dao.IdAllocDao;
import io.github.hpsocket.soa.framework.leaf.segment.model.*;
import io.github.hpsocket.soa.framework.leaf.service.IdGen;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

@Slf4j
public class SegmentIdGenImpl implements IdGen
{
    /**
     * IDCache未初始化成功时的异常码
     */
    private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
    /**
     * key不存在时的异常码
     */
    private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
    /**
     * SegmentBuffer中的两个Segment均未从DB中装载时的异常码
     */
    private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
    /**
     * 最大步长不超过100,0000
     */
    private static final int MAX_STEP = 1000000;
    /**
     * 一个Segment维持时间为15分钟
     */
    private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
    private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());
    private volatile boolean initOK = false;

    @Getter
    private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();

    @Getter
    @Setter
    private IdAllocDao dao;

    public static class UpdateThreadFactory implements ThreadFactory
    {
        private static int threadInitNumber = 0;

        private static synchronized int nextThreadNum()
        {
            return threadInitNumber++;
        }

        @Override
        public Thread newThread(Runnable r)
        {
            return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
        }
    }

    @Override
    public boolean init()
    {
        // log.info("Init ...");
        // 确保加载到kv后才初始化成功
        updateCacheFromDb();
        
        initOK = true;
        
        updateCacheFromDbAtEveryMinute();
        
        return initOK;
    }

    private void updateCacheFromDbAtEveryMinute()
    {
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
        {
            @Override
            public Thread newThread(Runnable r)
            {
                Thread t = new Thread(r);
                t.setName("check-idCache-thread");
                t.setDaemon(true);
                return t;
            }
        });
        
        service.scheduleWithFixedDelay(new Runnable()
        {
            @Override
            public void run()
            {
                updateCacheFromDb();
            }
        }, 60, 60, TimeUnit.SECONDS);
    }

    private void updateCacheFromDb()
    {
        // log.info("update cache from db");
        // StopWatch sw = new Slf4JStopWatch();
        try
        {
            List<String> dbTags = dao.getAllTags();
            if(dbTags == null || dbTags.isEmpty())
            {
                return;
            }
            
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
            Set<String> insertTagsSet = new HashSet<>(dbTags);
            Set<String> removeTagsSet = new HashSet<>(cacheTags);
            
            // db中新加的tags灌进cache
            for(int i = 0; i < cacheTags.size(); i++)
            {
                String tmp = cacheTags.get(i);
                if(insertTagsSet.contains(tmp))
                {
                    insertTagsSet.remove(tmp);
                }
            }
            
            for(String tag : insertTagsSet)
            {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
                // log.info("Add tag {} from db to IdCache, SegmentBuffer
                // {}", tag, buffer);
            }
            
            // cache中已失效的tags从cache删除
            for(int i = 0; i < dbTags.size(); i++)
            {
                String tmp = dbTags.get(i);
                if(removeTagsSet.contains(tmp))
                {
                    removeTagsSet.remove(tmp);
                }
            }
            
            for(String tag : removeTagsSet)
            {
                cache.remove(tag);
                // log.info("Remove tag {} from IdCache", tag);
            }
        }
        catch(Exception e)
        {
            log.warn("update cache from db exception", e);
        }
        finally
        {
            // sw.stop("updateCacheFromDb");
        }
    }

    @Override
    public Result get(final String key)
    {
        if(!initOK)
        {
            return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
        }
        
        if(cache.containsKey(key))
        {
            SegmentBuffer buffer = cache.get(key);
            
            if(!buffer.isInitOk())
            {
                synchronized(buffer)
                {
                    if(!buffer.isInitOk())
                    {
                        try
                        {
                            updateSegmentFromDb(key, buffer.getCurrent());
                            // log.info("Init buffer. Update leafkey {} {}
                            // from db", key, buffer.getCurrent());
                            buffer.setInitOk(true);
                        }
                        catch(Exception e)
                        {
                            log.warn("Init buffer {} exception", buffer.getCurrent(), e);
                        }
                    }
                }
            }
            
            return getIdFromSegmentBuffer(cache.get(key));
        }
        
        return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
    }

    public void updateSegmentFromDb(String key, Segment segment)
    {
        // StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
        LeafAlloc leafAlloc;
        
        if(!buffer.isInitOk())
        {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());// leafAlloc中的step为DB中的step
        }
        else if(buffer.getUpdateTimestamp() == 0)
        {
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(leafAlloc.getStep());
            buffer.setMinStep(leafAlloc.getStep());// leafAlloc中的step为DB中的step
        }
        else
        {
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
            
            if(duration < SEGMENT_DURATION)
            {
                if(nextStep * 2 > MAX_STEP)
                {
                    // do nothing
                }
                else
                {
                    nextStep = nextStep * 2;
                }
            }
            else if(duration < SEGMENT_DURATION * 2)
            {
                // do nothing with nextStep
            }
            else
            {
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
            
            // log.info("leafKey[{}], step[{}], duration[{}mins],
            // nextStep[{}]", key, buffer.getStep(),
            // String.format("%.2f",((double)duration / (1000 * 60))),
            // nextStep);
            LeafAlloc temp = new LeafAlloc();
            temp.setKey(key);
            temp.setStep(nextStep);
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            buffer.setStep(nextStep);
            buffer.setMinStep(leafAlloc.getStep());// leafAlloc的step为DB中的step
        }
        
        // must set value before set max
        long value = leafAlloc.getMaxId() - buffer.getStep();
        segment.getValue().set(value);
        segment.setMax(leafAlloc.getMaxId());
        segment.setStep(buffer.getStep());
        // sw.stop("updateSegmentFromDb", key + " " + segment);
    }

    public Result getIdFromSegmentBuffer(final SegmentBuffer buffer)
    {
        while(true)
        {
            buffer.rLock().lock();
            try
            {
                final Segment segment = buffer.getCurrent();
                
                if(!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true))
                {
                    service.execute(new Runnable()
                    {
                        @Override
                        public void run()
                        {
                            Segment next = buffer.getSegments()[buffer.nextPos()];
                            boolean updateOk = false;
                            
                            try
                            {
                                updateSegmentFromDb(buffer.getKey(), next);
                                updateOk = true;
                                // log.info("update segment {} from db {}",
                                // buffer.getKey(), next);
                            }
                            catch(Exception e)
                            {
                                log.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                            }
                            finally
                            {
                                if(updateOk)
                                {
                                    buffer.wLock().lock();
                                    buffer.setNextReady(true);
                                    buffer.getThreadRunning().set(false);
                                    buffer.wLock().unlock();
                                }
                                else
                                {
                                    buffer.getThreadRunning().set(false);
                                }
                            }
                        }
                    });
                }
                
                long value = segment.getValue().getAndIncrement();
                
                if(value < segment.getMax())
                {
                    return new Result(value, Status.SUCCESS);
                }
            }
            finally
            {
                buffer.rLock().unlock();
            }
            
            waitAndSleep(buffer);
            buffer.wLock().lock();
            
            try
            {
                final Segment segment = buffer.getCurrent();
                long value = segment.getValue().getAndIncrement();
                
                if(value < segment.getMax())
                {
                    return new Result(value, Status.SUCCESS);
                }
                
                if(buffer.isNextReady())
                {
                    buffer.switchPos();
                    buffer.setNextReady(false);
                }
                else
                {
                    log.error("Both two segments in {} are not ready!", buffer);
                    return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                }
            }
            finally
            {
                buffer.wLock().unlock();
            }
        }
    }

    private void waitAndSleep(SegmentBuffer buffer)
    {
        int roll = 0;
        
        while(buffer.getThreadRunning().get())
        {
            if((++roll) > 2000)
            {
                break;
            }

            if((roll % 100) == 0)
            {
                try
                {
                    TimeUnit.MILLISECONDS.sleep(1);
                }
                catch(InterruptedException e)
                {
                    log.warn("Thread {} Interrupted", Thread.currentThread().getName());
                    break;
                }
            }
        }
    }

    public List<LeafAlloc> getAllLeafAllocs()
    {
        return dao.getAllLeafAllocs();
    }
}
